1.准备一个测试函数
public static void main(String[] args) {
MyThreadPool myThreadPool = new MyThreadPool();
for (int i = 0; i < 5; i++) {
myThreadPool.execute(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName());
});
}
System.out.println("主线程未被阻塞...");
}
MyThreadPool是我们的自定义线程池,其中包含一个**execute()**方法,用来执行我们的方法。
2.编写MyThreadPool
void execute(Runnable command) {
}
什么时候创建线程? 线程是否可以复用? command该怎么保存
1. 使用ArrayList保存任务
当只有一个线程执行任务时,使用List保存runnable。 将接收到的runnable存入一个集合内 需要执行时从集合中取出
List<Runnable> commandList = new ArrayList<>();
Thread thread = new Thread(() -> {
while (true) {
if (!commandList.isEmpty()) {
Runnable command = commandList.remove(0);
command.run();
}
}
});
void execute(Runnable command) {
commandList.add(command);
}
使用list时 当list中没有元素时 while (true) 会浪费cpu资源, 需要继续优化
2. 替换为使用阻塞队列保存任务
BlockingQueue<Runnable> commandList = new ArrayBlockingQueue<>(1024);
Thread thread = new Thread(() -> {
while (true) {
try {
Runnable take = commandList.take();
take.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}, "唯一线程");
{
thread.start();
}
void execute(Runnable command) {
// 在往BlockingQueue中添加元素时不要使用add, 而是换成offer,offer有一个返回值
// 返回是否添加成功的结果
boolean offer = commandList.offer(command);
}
当队列中没有任务时,**take()**函数会阻塞,而不会浪费CPU资源.
执行main函数,得到如下结果
主线程未被阻塞...
唯一线程
唯一线程
唯一线程
唯一线程
唯一线程
3. 增加线程Thread的数量
我们的线程池应该有多少个线程?
首先将command提取出来,作为一个task
public final Runnable task = () -> {
while (true) {
try {
Runnable take = commandList.take();
take.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};
接下来定义一个变量 保存线程的数量,使用List保存线程
private int corePoolSize = 10;
List<Thread> threadList = new ArrayList<>();
// 判断threadList中元素的数量,如果没有达到corePoolSize,则继续创建线程
void execute(Runnable command) {
if (threadList.size() < corePoolSize) {
Thread thread = new Thread(task);
threadList.add(thread);
thread.start();
}
boolean offer = commandList.offer(command);
}
4.增加辅助线程处理任务
如果offer返回的是false, 说明threadList的容量已经满了,需要创建一个新的List来执行其他的任务
private int corePoolSize = 10;
private int maxSize = 16;
List<Thread> coreList = new ArrayList<>();
List<Thread> supportList = new ArrayList<>();
void execute(Runnable command) {
if (coreList.size() < corePoolSize) {
Thread thread = new Thread(task);
coreList.add(thread);
thread.start();
}
boolean offer = commandList.offer(command);
if (!offer) {
Thread thread = new Thread(task);
supportList.add(thread);
thread.start();
}
}
修改一下代码,当核心线程数量加上辅助线程数量小于最大线程数量时,才允许添加到辅助线程List.
上面的代码有线程安全问题,辅助线程在创建后并不是执行之前未放到阻塞队列中的任务
并且即使创建了辅助线程 此时的阻塞队列依然可能是满的
所以在创建完辅助线程后,需要重新将command再放到辅助线程中
如果此时还是失败,说明阻塞队列满了
void execute(Runnable command) {
if (coreList.size() < corePoolSize) {
Thread thread = new Thread(task);
coreList.add(thread);
thread.start();
}
if (commandList.offer(command)) {
return;
}
// 当核心线程数量加上辅助线程数量小于最大线程数量时,可以往创建辅助线程
if (coreList.size() + supportList.size() < maxSize) {
Thread thread = new Thread(task);
supportList.add(thread);
thread.start();
}
if (!commandList.offer(command)) {
throw new RuntimeException("阻塞队列满了");
}
}
5.辅助线程的结束时机
在线程池不忙碌的时候,应该将辅助线程释放,节省空间.
使用BlockingQueue 的 poll 函数,传入一个时间和一个时间单位.
规定一个超时时间,如果在这个时间内仍然没有拿到任务,说明线程池不忙碌
BlockingQueue<Runnable> commandList = new ArrayBlockingQueue<>(1024);
public final Runnable coreTask = () -> {
while (true) {
try {
Runnable take = commandList.take();
take.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};
public final Runnable supportTask = () -> {
while (true) {
try {
Runnable take = commandList.poll(1, TimeUnit.SECONDS);
if (take == null) {
break;
}
take.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println("辅助线程" + Thread.currentThread().getName() + "结束了");
};
void execute(Runnable command) {
if (coreList.size() < corePoolSize) {
Thread thread = new Thread(coreTask);
coreList.add(thread);
thread.start();
}
if (commandList.offer(command)) {
return;
}
if (coreList.size() + supportList.size() < maxSize) {
Thread thread = new Thread(supportTask);
supportList.add(thread);
thread.start();
}
if (!commandList.offer(command)) {
throw new RuntimeException("阻塞队列满了");
}
}
此时coreList执行的是coreTask, supportList执行的是supportTask
6.优化参数
将MyTheadPool中需要的参数整理一下,生成构造方法,由外部传入, 同时优化任务.
BlockingQueue<Runnable> commandList = new ArrayBlockingQueue<>(1024);
// 定义一个变量 保存线程的数量
private final int corePoolSize;
private final int maxSize;
private final int timeout;
private final TimeUnit timeUnit;
List<Thread> coreList = new ArrayList<>();
List<Thread> supportList = new ArrayList<>();
public MyThreadPool(int corePoolSize, int maxSize, int timeout, TimeUnit timeUnit) {
this.corePoolSize = corePoolSize;
this.maxSize = maxSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
}
void execute(Runnable command) {
if (coreList.size() < corePoolSize) {
Thread thread = new CoreThread();
coreList.add(thread);
thread.start();
}
if (commandList.offer(command)) {
return;
}
// 当核心线程数量加上辅助线程数量小于最大线程数量时,可以往创建辅助线程
if (coreList.size() + supportList.size() < maxSize) {
Thread thread = new SupportThread();
supportList.add(thread);
thread.start();
}
if (!commandList.offer(command)) {
throw new RuntimeException("阻塞队列满了");
}
}
class CoreThread extends Thread {
@Override
public void run() {
while (true) {
try {
Runnable take = commandList.take();
take.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
class SupportThread extends Thread {
@Override
public void run() {
while (true) {
try {
Runnable take = commandList.poll(timeout, timeUnit);
if (take == null) {
break;
}
take.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println("辅助线程" + Thread.currentThread().getName() + "结束了");
}
}
同时可以把阻塞队列也交给调用方,动态指定队列的大小
private final int corePoolSize;
private final int maxSize;
private final int timeout;
private final TimeUnit timeUnit;
public final BlockingQueue<Runnable> commandList;
public MyThreadPool(int corePoolSize, int maxSize, int timeout, TimeUnit timeUnit, BlockingQueue<Runnable> commandList) {
this.corePoolSize = corePoolSize;
this.maxSize = maxSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.commandList = commandList;
}
此时的main方法修改为下面的形式
public static void main(String[] args) {
MyThreadPool myThreadPool = new MyThreadPool(2, 4, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2));
for (int i = 0; i < 6; i++) {
myThreadPool.execute(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName());
});
}
System.out.println("主线程未被阻塞...");
}
此时执行main方法的结果如下
Exception in thread "main" java.lang.RuntimeException: 阻塞队列满了
at com.carlos.maintenance.insight.MyThreadPool.execute(MyThreadPool.java:86)
at com.carlos.maintenance.insight.Main.main(Main.java:12)
Thread-0
Thread-2
Thread-1
Thread-3
Thread-0
辅助线程Thread-2结束了
辅助线程Thread-3结束了
6个任务,只执行了5个,阻塞队列满了丢弃了一个任务,当阻塞队列满了的时候只是抛出了一个异常,
需要优化一下这一部分,作为一个拒绝策略
7.拒绝策略
新建一个接口,处理失败的任务
public interface RejectHandle {
// 失败的任务 和 线程池本身
void reject(Runnable rejectCommand, MyThreadPool threadPool);
}
将拒绝策略传入到线程池中
private final int corePoolSize;
private final int maxSize;
private final int timeout;
private final TimeUnit timeUnit;
public final BlockingQueue<Runnable> commandList;
private final RejectHandle rejectHandle;
public MyThreadPool(int corePoolSize, int maxSize, int timeout, TimeUnit timeUnit, BlockingQueue<Runnable> commandList, RejectHandle rejectHandle) {
this.corePoolSize = corePoolSize;
this.maxSize = maxSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.commandList = commandList;
this.rejectHandle = rejectHandle;
}
if (!commandList.offer(command)) {
rejectHandle.reject(command, this);
}
假如在触发拒绝策略时想抛出一个异常
public class ThrowRejectHandle implements RejectHandle{
@Override
public void reject(Runnable rejectCommand, MyThreadPool threadPool) {
System.out.println("阻塞队列满了");
}
}
此时的main方法为:
MyThreadPool myThreadPool = new MyThreadPool(2, 4, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), new ThrowRejectHandle());
如果想在触发拒绝策略时拿出一个任务扔掉,放进这个失败的任务
public class DiscardRejectHandle implements RejectHandle{
@Override
public void reject(Runnable rejectCommand, MyThreadPool threadPool) {
threadPool.commandList.poll();
threadPool.execute(rejectCommand);
}
}
main方法为
MyThreadPool myThreadPool = new MyThreadPool(2, 4, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), new DiscardRejectHandle());
此时的执行结果为:
主线程未被阻塞...
Thread-1
Thread-0
Thread-3
Thread-2
Thread-1
辅助线程Thread-2结束了
辅助线程Thread-3结束了
此时说明6个任务被扔掉了一个.